package com.atguigu.chapter06;

import com.atguigu.chapter02.Event;
import com.atguigu.chapter05.Clicksource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AggregateApplyWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

public class UrlViewCountExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        SingleOutputStreamOperator<Event> stream = env.addSource(new Clicksource()).
                assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).
                        withTimestampAssigner((a, b) -> a.timestamp));

        stream.keyBy( a -> a.url ).
                window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5))).
                aggregate(new UrlViewCountAgg(),new UrlViewCountResult()).
                print();


        env.execute();


    }
    //增量 输入，累加器，输出
    private static class UrlViewCountAgg implements AggregateFunction<Event,Long,Long> {
        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(Event event, Long aLong) {
            return aLong+1L;
        }

        @Override
        public Long getResult(Long aLong) {
            return aLong;
        }

        @Override
        public Long merge(Long aLong, Long acc1) {
            return null;
        }
    }

    private static class UrlViewCountResult extends ProcessWindowFunction<Long,UrlViewCount,String, TimeWindow> {
        @Override
        public void process(String s, Context context, Iterable<Long> iterable, Collector<UrlViewCount> collector) throws Exception {
            long start = context.window().getStart();
            long end = context.window().getEnd();


            collector.collect(new UrlViewCount(s,iterable.iterator().next(),start,end));

        }
    }
}
